package golib

import (
	"time"

	rds "github.com/go-redis/redis"
)

type DelayQueue struct {
	Client    *rds.Client
	QueueName string
}

type Callback func(rds.Z) bool

func NewDelayQueue(redisClient *rds.Client, queueName string) *DelayQueue {
	return &DelayQueue{
		Client:    redisClient,
		QueueName: "delay_queue:" + queueName,
	}
}

// 插入队列
func (delayQueue DelayQueue) Push(value interface{}, delay float64) {
	delayQueue.Client.ZAdd(delayQueue.QueueName, rds.Z{Score: Float64(time.Now().Unix()) + delay, Member: value})
}

// 消费者
func (delayQueue DelayQueue) Consumer(callback Callback, param ...int64) {
	num := int64(1)
	if param != nil {
		num = param[0]
	}
	for {
		now := time.Now().Unix()
		list, err := delayQueue.Client.ZRangeByScoreWithScores(delayQueue.QueueName, rds.ZRangeBy{Min: "-inf", Max: String(now), Offset: 0, Count: num}).Result()
		if err == nil && len(list) > 0 {
			data := list[0]
			n, err1 := delayQueue.Client.ZRem(delayQueue.QueueName, data.Member).Result()
			if err1 == nil && n > 0 { //删除成功,获取消费权
				delayQueue.consumed(data) //移入已消费列表
				rs := callback(data)
				if !rs {
					delayQueue.fail(data)
				} else {
					delayQueue.success(data)
				}
			}
		} else {
			time.Sleep(3 * time.Second)
		}
	}
}

// 插入已消费队列
func (delayQueue DelayQueue) consumed(data rds.Z) bool {
	err := delayQueue.Client.ZAdd(delayQueue.QueueName+":consumed", data).Err()
	if err == nil {
		return true
	} else {
		return false
	}
}

// 插入失败队列
func (delayQueue DelayQueue) fail(data rds.Z) bool {
	err := delayQueue.Client.ZAdd(delayQueue.QueueName+":fail", data).Err()
	if err == nil {
		return true
	} else {
		return false
	}
}

// 添加发送日志
func (delayQueue DelayQueue) success(data rds.Z) bool {
	err := delayQueue.Client.ZAdd(delayQueue.QueueName+":success", data).Err()
	if err == nil {
		return true
	} else {
		return false
	}
}
